1   /**
2    * Copyright 2014 Netflix, Inc.
3    * 
4    * Licensed under the Apache License, Version 2.0 (the "License");
5    * you may not use this file except in compliance with the License.
6    * You may obtain a copy of the License at
7    * 
8    * http://www.apache.org/licenses/LICENSE-2.0
9    * 
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS,
12   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   * See the License for the specific language governing permissions and
14   * limitations under the License.
15   */
16  package rx.internal.operators;
17  
18  
19  import rx.Observable;
20  import rx.Observable.Operator;
21  import rx.Subscriber;
22  import rx.functions.Func1;
23  
24  /**
25   * Returns an {@link Observable} that emits <code>true</code> if any element of
26   * an observable sequence satisfies a condition, otherwise <code>false</code>.
27   */
28  public final class OperatorAny<T> implements Operator<Boolean, T> {
29      private final Func1<? super T, Boolean> predicate;
30      private final boolean returnOnEmpty;
31  
32      public OperatorAny(Func1<? super T, Boolean> predicate, boolean returnOnEmpty) {
33          this.predicate = predicate;
34          this.returnOnEmpty = returnOnEmpty;
35      }
36  
37      @Override
38      public Subscriber<? super T> call(final Subscriber<? super Boolean> child) {
39          Subscriber<T> s = new Subscriber<T>() {
40              boolean hasElements;
41              boolean done;
42  
43              @Override
44              public void onNext(T t) {
45                  hasElements = true;
46                  boolean result = predicate.call(t);
47                  if (result && !done) {
48                      done = true;
49                      child.onNext(!returnOnEmpty);
50                      child.onCompleted();
51                      unsubscribe();
52                  } else {
53                      // if we drop values we must replace them upstream as downstream won't receive and request more
54                      request(1);
55                  }
56              }
57  
58              @Override
59              public void onError(Throwable e) {
60                  child.onError(e);
61              }
62  
63              @Override
64              public void onCompleted() {
65                  if (!done) {
66                      done = true;
67                      if (hasElements) {
68                          child.onNext(false);
69                      } else {
70                          child.onNext(returnOnEmpty);
71                      }
72                      child.onCompleted();
73                  }
74              }
75  
76          };
77          child.add(s);
78          return s;
79      }
80  }